MQ消息构造--学会分解问题
作者|赵力新
各位亲爱的朋友,本文小编将抛砖引玉,谈一谈在工作中遇到问题的处理思路,希望给遇到迷惑的朋友一点指引。
RocketMQ是阿里向Apache贡献的消息中间件,是一个开源的分布式消息传递和流式数据平台。
随着公司的体量、业务呈现指数式增长,技术层面开始面临着数据采集、数据异构、系统整合等诸多问题。由于RPC采用同步处理技术,在性能、健壮性、可扩展性上都存在着诸多缺点,消息队列以异步处理,非阻塞调用的技术模型悄然登场,并迅速成为分布式架构的宠儿。在消息队列的诸多模型中,RocketMQ以其消息不丢失、架构优势、高可用、高吞吐量等诸多优势被广泛应用。
其架构图如下:
主要分以下几个模块:
nameserv
是基于服务注册发现功能的无状态组件,支持独立部署。在RocketMQ架构体系中,属于重中之重。生产者从namesrv中获取可用的broker地址,将消息发送至broker;消费者从namesrv中获取可用的broker地址,从broker中拉取消息;broker定时向naemsrv发送心跳信息,维护可用broker地址。
broker
是基于高性能和低延迟的文件存储的无状态组件,支持独立部署。在RocketMQ架构体系中,属于核心级组件,负责存储所有的消息相关文件(包括消息文件,日志文件等等)。
生产者及消费者
是我们在实际编码过程中直接接触到的,也是RocketMQ直接暴露给编码人员的API接口,生产者和消费者支持分布式部署。生产者,产生消息的实例;消费者,接收消息进行消费的实例。
在开发自测或者测试工作中,经常会遇到这种情况,待测方处于消费者下游,需要收到一个RocketMQ消息(订单状态变更/商品状态变更),才能触发要测的流程(例子:订单状态变更为已收货->给买卖双方发送验机评价消息,商品状态变更为风控下架->给卖家发送退回商品的信息等)。为了更好的模拟待测业务场景,又避免构造太多和本次测试无关的数据,我们需要一个模拟生产MQ消息的工具。
OK,来搞一个模拟生产MQ消息的工具吧。
接到任务的时候先不要瑟瑟发抖,来跟小编做几个思考题~
思考:1.MQ消息是什么样子的?
从日志中抓取一个被包装后的MQ消息如下:
MQ Message由一些关键的信息组成:
(1)Topic:消息的业务逻辑分类,区分订单类的消息or库存类的消息;
(2)Tag:对Topic的进一步细化,标记同一业务模块中不同用途的消息;
(3)Body:承载消息的消息体;
(4)Keys:用于消息的存储和查询,此处可以用默认值;
思考:2.如何生产一条MQ消息?
业务代码依赖alibaba的RocketMQ-client jar包,主要过程很清晰,简要代码如下:
思考:3.模拟生产MQ消息的工具设计成什么样才便于大家使用呢?
如果是一个人用,写一段代码 、一个单元测试运行一下即可。但是为了降低多人使用时安装运行环境的成本,我们最好提供一个接口,一个页面,让使用者输入Topic/Tag/Body,点击发送按钮就能发送一条MQ消息了。
小编主要是提供后端接口的,刚刚好小编同时用Java和Python两种语言。如果用Java来提供接口,实现过程类似第二步生产MQ消息的代码截图,把参数提取出来即可。
(到这里,其实你的工具已经能够做好了。下边是一道附加题。)
如果你的后端是Python语言来写的,如何用Python来生产一条RocketMQ消息呢?
思考:4.Python能否生产RocketMQ消息?怎么生产?
通过查阅资料可知,RocketMQ提供了jar包,也有提供python第三方包,那么Python能否调用jar包,使用Java的API呢?
答案是肯定的,Python中有个JPype,可以启动JVM,方便地调用Jar包。使用过程如下:
1、通过startJVM()方法启动JVM,把依赖的Jar包路径通过"-Djava.ext.dirs="参数设置进去;
2、通过JPackage引入所需的Java package,可以直接调用Java类的接口;
DefaultMQProducer = JPackage('com.alibaba.rocketmq.client.producer').DefaultMQProducer
MQClientException = JPackage('com.alibaba.rocketmq.client.exception').MQClientException
SendResult = JPackage('com.alibaba.rocketmq.client.producer').SendResult
Message = JPackage('com.alibaba.rocketmq.common.message').Message
SENDSTATUS = JPackage('com.alibaba.rocketmq.client.producer').SendStatus
3、实例化producer,并启动它;
4、实例化Message;
5、通过producer的send()方法发送消息;
6、通过producer.shutdown()方法来关闭producer;
7、通过Jpype的shutdownJVM()方法来关闭JVM;
通过上述步骤可以看出,Python和Java中生产一个MQ消息是一样的,这个思想可以用于其他在Python中调用Jar包的场景。
好啦,通过小编的讲解,“弄一个发送MQ消息的工具” 是不是变得很简单了呢?
最后,小编要说,遇到问题多Google,其实最主要的问题,是知道自己下一步要做什么。牛人千千万,大多数问题都已经有人解决了,我们需要多查资料,站在牛人的肩膀上,多多思考。
往期精彩回顾